1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package rx.internal.operators;
18
19 import static org.mockito.Matchers.any;
20 import static org.mockito.Mockito.*;
21
22 import java.util.Arrays;
23
24 import org.junit.*;
25
26 import rx.*;
27 import rx.exceptions.TestException;
28 import rx.functions.Func1;
29 import rx.internal.util.UtilityFunctions;
30 import rx.observers.TestSubscriber;
31 ;
32
33 public class OperatorTakeUntilPredicateTest {
34 @Test
35 public void takeEmpty() {
36 @SuppressWarnings("unchecked")
37 Observer<Object> o = mock(Observer.class);
38
39 Observable.empty().takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
40
41 verify(o, never()).onNext(any());
42 verify(o, never()).onError(any(Throwable.class));
43 verify(o).onCompleted();
44 }
45 @Test
46 public void takeAll() {
47 @SuppressWarnings("unchecked")
48 Observer<Object> o = mock(Observer.class);
49
50 Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
51
52 verify(o).onNext(1);
53 verify(o).onNext(2);
54 verify(o, never()).onError(any(Throwable.class));
55 verify(o).onCompleted();
56 }
57 @Test
58 public void takeFirst() {
59 @SuppressWarnings("unchecked")
60 Observer<Object> o = mock(Observer.class);
61
62 Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
63
64 verify(o).onNext(1);
65 verify(o, never()).onNext(2);
66 verify(o, never()).onError(any(Throwable.class));
67 verify(o).onCompleted();
68 }
69 @Test
70 public void takeSome() {
71 @SuppressWarnings("unchecked")
72 Observer<Object> o = mock(Observer.class);
73
74 Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
75 @Override
76 public Boolean call(Integer t1) {
77 return t1 == 2;
78 }
79 }).subscribe(o);
80
81 verify(o).onNext(1);
82 verify(o).onNext(2);
83 verify(o, never()).onNext(3);
84 verify(o, never()).onError(any(Throwable.class));
85 verify(o).onCompleted();
86 }
87 @Test
88 public void functionThrows() {
89 @SuppressWarnings("unchecked")
90 Observer<Object> o = mock(Observer.class);
91
92 Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
93 @Override
94 public Boolean call(Integer t1) {
95 throw new TestException("Forced failure");
96 }
97 }).subscribe(o);
98
99 verify(o).onNext(1);
100 verify(o, never()).onNext(2);
101 verify(o, never()).onNext(3);
102 verify(o).onError(any(TestException.class));
103 verify(o, never()).onCompleted();
104 }
105 @Test
106 public void sourceThrows() {
107 @SuppressWarnings("unchecked")
108 Observer<Object> o = mock(Observer.class);
109
110 Observable.just(1)
111 .concatWith(Observable.<Integer>error(new TestException()))
112 .concatWith(Observable.just(2))
113 .takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
114
115 verify(o).onNext(1);
116 verify(o, never()).onNext(2);
117 verify(o).onError(any(TestException.class));
118 verify(o, never()).onCompleted();
119 }
120 @Test
121 public void backpressure() {
122 TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
123 @Override
124 public void onStart() {
125 request(5);
126 }
127 };
128
129 Observable.range(1, 1000).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(ts);
130
131 ts.assertNoErrors();
132 ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
133 Assert.assertEquals(0, ts.getOnCompletedEvents().size());
134 }
135 }